Data Cleaning

Load the dataset

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

np.random.seed(42)

pio.renderers.default = "notebook"

spark = SparkSession.builder.appName("LightcastData").getOrCreate()

jobs_df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")
jobs_df.createOrReplaceTempView("job_postings")

elections_df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/2024_election_results.csv")
elections_df.createOrReplaceTempView("election_results")

#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#jobs_df.show(5)
#elections_df.show(5)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/10 18:31:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/10 18:31:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]                                                                                [Stage 1:>                                                          (0 + 1) / 1]                                                                                25/10/10 18:32:14 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Data Cleaning

# casting corrected variable type
jobs_df = jobs_df.withColumn("SALARY_FROM", col ("SALARY_FROM").cast("float"))\
  .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
  .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))\
  .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float"))\
  .withColumn("SALARY", col("SALARY").cast("float"))

# Clean Up Columns
jobs_df = jobs_df.withColumn("EDUCATION_LEVELS_NAME", regexp_replace(col("EDUCATION_LEVELS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SOURCE_TYPES", regexp_replace(col("SOURCE_TYPES"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SOURCES", regexp_replace(col("SOURCES"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SKILLS", regexp_replace(col("SKILLS"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SKILLS_NAME", regexp_replace(col("SKILLS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SPECIALIZED_SKILLS_NAME", regexp_replace(col("SPECIALIZED_SKILLS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("CERTIFICATIONS_NAME", regexp_replace(col("CERTIFICATIONS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("COMMON_SKILLS_NAME", regexp_replace(col("COMMON_SKILLS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("SOFTWARE_SKILLS_NAME", regexp_replace(col("SOFTWARE_SKILLS_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("CIP6_NAME", regexp_replace(col("CIP6_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("CIP4_NAME", regexp_replace(col("CIP4_NAME"), "[\n\r]", ""))
jobs_df = jobs_df.withColumn("CIP2_NAME", regexp_replace(col("CIP2_NAME"), "[\n\r]", ""))


# Compute and impute Median Salary
def compute_median(sdf, col_name):
  q = sdf.approxQuantile(col_name, [0.5], 0.01)
  return q[0] if q else None


median_from = compute_median(jobs_df, "SALARY_FROM")
median_to = compute_median(jobs_df, "SALARY_TO")
median_salary = compute_median(jobs_df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

jobs_df = jobs_df.fillna({
  "SALARY_FROM": median_from,
  "SALARY_TO": median_to,
  "SALARY": median_salary
})

from pyspark.sql.functions import col
jobs_df = jobs_df.withColumn(
    "MIDPOINT_SALARY",
    (col("SALARY_TO") + col("SALARY_FROM")) / 2
)

# Dropping unnecessary columns
columns_to_drop = [
    "ID", "URL", "ACTIVE_URLS", "DUPLICATES", "LAST_UPDATED_TIMESTAMP","STATE","COUNTY_OUTGOING","COUNTY_INCOMMING","MSA_OUTGOING","MSA_INCOMING",
    "NAICS2", "NAICS3", "NAICS4", "NAICS5", "NAICS6", "ONET","ONET_2019","CIP6","CIP4","CIP2","SOC_2021_2","SOC_2021_3","SOC_2021_4","SOC_2021_5","SOC_2", "SOC_3", "SOC_4","SOC_5", "NAICS_2022_2","NAICS_2022_3","NAICS_2022_4","NAICS_2022_5","NAICS_2022_6","CITY","COUNTY","MSA","COUNTY_INCOMING"
]
jobs_df = jobs_df.drop(*columns_to_drop)

# configuring remote work groups
from pyspark.sql.functions import when, col, trim

jobs_df = jobs_df.withColumn("REMOTE_GROUP",
  when(trim(col("REMOTE_TYPE_NAME"))== "Remote", "Remote")
  .when(trim(col("REMOTE_TYPE_NAME"))== "Hybrid Remote", "Hybrid")
  .when(trim(col("REMOTE_TYPE_NAME"))== "Not Remote", "Onsite")
  .when(col("REMOTE_TYPE_NAME").isNull(), "Onsite")
  .otherwise("Onsite")
)

# dropping any duplicate postings
jobs_df = jobs_df.dropDuplicates(["TITLE", "COMPANY", "LOCATION", "POSTED"])

# handling missing data
from pyspark.sql.functions import col, when, sum as spark_sum

total_rows = jobs_df.count()
missing_threshold = total_rows * 0.5
null_counts = jobs_df.select([
    (spark_sum(col(c).isNull().cast("int"))).alias(c) for c in jobs_df.columns
]).collect()[0].asDict()
columns_to_keep = [c for c, nulls in null_counts.items() if nulls <= missing_threshold or c == "SALARY"]
jobs_df = jobs_df.select(columns_to_keep)

#jobs_df.show(15)
[Stage 4:>                                                          (0 + 1) / 1]                                                                                [Stage 5:>                                                          (0 + 1) / 1]                                                                                [Stage 6:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 7:>                                                          (0 + 1) / 1]                                                                                [Stage 13:>                                                         (0 + 1) / 1][Stage 15:>                                                         (0 + 2) / 6][Stage 15:=========>                                                (1 + 2) / 6][Stage 15:===================>                                      (2 + 2) / 6][Stage 15:=============================>                            (3 + 2) / 6][Stage 15:======================================>                   (4 + 2) / 6][Stage 15:================================================>         (5 + 1) / 6]                                                                                
from pyspark.sql import functions as F

jobs_df = jobs_df.withColumn("STATE_ABBREVIATION", F.trim(F.split(jobs_df["COUNTY_NAME"], ",").getItem(1)))

jobs_alias = jobs_df.alias("jobs")
elections_alias = elections_df.alias("elections")

jobs_df = jobs_alias.join(
    elections_alias,
    F.col("jobs.STATE_ABBREVIATION") == F.col("elections.STATE"),
    "left"
)
jobs_df = jobs_df.drop(F.col("elections.STATE"))

jobs_df = jobs_df.withColumnRenamed("Affiliation", "AFFILIATION")

#jobs_df.show(15)
selected_df = jobs_df.select(
  "EDUCATION_LEVELS_NAME",
  "MIN_EDULEVELS_NAME",
  "EMPLOYMENT_TYPE_NAME",
  "MIN_YEARS_EXPERIENCE",
  "SALARY_TO",
  "SALARY_FROM",
  "SALARY",
  "CITY_NAME",
  "MSA_NAME",
  "STATE_NAME",
  "NAICS2_NAME",
  "NAICS3_NAME",
  "NAICS4_NAME",
  "NAICS5_NAME",
  "NAICS6_NAME",
  "SKILLS_NAME",
  "SPECIALIZED_SKILLS_NAME",
  "CERTIFICATIONS_NAME",
  "COMMON_SKILLS_NAME",
  "SOFTWARE_SKILLS_NAME",
  "ONET_NAME",
  "LOT_CAREER_AREA_NAME",
  "LOT_OCCUPATION_NAME",
  "LOT_SPECIALIZED_OCCUPATION_NAME",
  "LOT_OCCUPATION_GROUP_NAME",
  "LOT_V6_SPECIALIZED_OCCUPATION_NAME",
  "LOT_V6_OCCUPATION_NAME",
  "LOT_V6_OCCUPATION_GROUP_NAME",
  "LOT_V6_CAREER_AREA_NAME",
  "SOC_2_NAME",
  "SOC_3_NAME",
  "SOC_4_NAME",
  "SOC_5_NAME",
  "REMOTE_GROUP",
  "STATE_ABBREVIATION",
  "AFFILIATION",
  "MIDPOINT_SALARY"
)
import pandas as pd
from pyspark.sql.functions import col, sum as spark_sum, when, trim, length
import hvplot.pandas


df_sample = selected_df.sample(fraction = .05, seed = 42).toPandas()

missing_mask = df_sample.isnull()

missing_long = (
  missing_mask.reset_index()
  .melt(id_vars = "index", var_name = "column", value_name = "is_missing")
)

missing_long["is_missing"] = missing_long["is_missing"].astype(int)

missing_long.hvplot.heatmap(
  x="column",
  y="index",
  C = "is_missing",
  cmap = "Blues",
  width = 900,
  height = 500,
  title = "Heatmap of Missing Values (5%)"
).opts(xrotation=45)
[Stage 20:>                                                         (0 + 1) / 1][Stage 22:>                                                         (0 + 2) / 2]                                                                                
from pyspark.sql.functions import countDistinct

selected_df.select([
  countDistinct(c).alias(c+"_nunique")
  for c in selected_df.columns
]).show(truncate=False)

# Education Levels

selected_df = selected_df.withColumn(
  "EDUCATION_LEVELS_NAME",
    when(col("EDUCATION_LEVELS_NAME").isNull(), "No Education Listed")
    .otherwise(col("EDUCATION_LEVELS_NAME"))
)

# Min Edu Levels

selected_df = selected_df.withColumn(
  "MIN_EDULEVELS_NAME",
    when(col("MIN_EDULEVELS_NAME").isNull(), "No Education Listed")
    .otherwise(col("MIN_EDULEVELS_NAME"))
)

# Employment Type Name

selected_df = selected_df.withColumn(
  "EMPLOYMENT_TYPE_NAME",
    when(col("EMPLOYMENT_TYPE_NAME") == "Part-time / full-time","Flexible")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Part-time (≤ 32 hours)","Part-Time")
    .when(col("EMPLOYMENT_TYPE_NAME") == "Full-time (> 32 hours)","Full-Time")
    .when(col("EMPLOYMENT_TYPE_NAME").isNull(), "Full-Time")
    .otherwise(col("EMPLOYMENT_TYPE_NAME"))
)

# Min Years Experience
selected_df = selected_df.withColumn(
    "MIN_YEARS_EXPERIENCE",
    when(col("MIN_YEARS_EXPERIENCE").isNull(), 0)
    .otherwise(col("MIN_YEARS_EXPERIENCE"))
)

# Salary to
selected_df = selected_df.withColumn(
    "SALARY_TO",
    when(col("SALARY_TO").isNull(), median_to)
    .otherwise(col("SALARY_TO"))
)

# Salary from
selected_df = selected_df.withColumn(
    "SALARY_FROM",
    when(col("SALARY_FROM").isNull(), median_from)
    .otherwise(col("SALARY_FROM"))
)

# Salary 
selected_df = selected_df.withColumn(
    "SALARY",
    when(col("SALARY").isNull(), median_salary)
    .otherwise(col("SALARY"))
)

# City Name
selected_df = selected_df.withColumn(
  "CITY_NAME",
    when(col("CITY_NAME").isNull(), "Unknown")
    .otherwise(col("CITY_NAME"))
)

# MSA Name
selected_df = selected_df.withColumn(
  "MSA_NAME",
    when(col("MSA_NAME").isNull(), "Unknown")
    .otherwise(col("MSA_NAME"))
)

# State Name
selected_df = selected_df.withColumn(
  "STATE_NAME",
    when(col("STATE_NAME").isNull(), "Unknown")
    .otherwise(col("STATE_NAME"))
)

# NAICS2_NAME 
selected_df = selected_df.withColumn(
  "NAICS2_NAME",
    when(col("NAICS2_NAME").isNull(), "Unknown")
    .otherwise(col("NAICS2_NAME"))
)

# NAICS3_NAME 
selected_df = selected_df.withColumn(
  "NAICS3_NAME",
    when(col("NAICS3_NAME").isNull(), "Unknown")
    .otherwise(col("NAICS3_NAME"))
)

# NAICS4_NAME 
selected_df = selected_df.withColumn(
  "NAICS4_NAME",
    when(col("NAICS4_NAME").isNull(), "Unknown")
    .otherwise(col("NAICS4_NAME"))
)

# NAICS5_NAME 
selected_df = selected_df.withColumn(
  "NAICS5_NAME",
    when(col("NAICS5_NAME").isNull(), "Unknown")
    .otherwise(col("NAICS5_NAME"))
)

# NAICS6_NAME 
selected_df = selected_df.withColumn(
  "NAICS6_NAME",
    when(col("NAICS6_NAME").isNull(), "Unknown")
    .otherwise(col("NAICS6_NAME"))
)

#STATE ABBREVIATION
selected_df = selected_df.withColumn(
  "STATE_ABBREVIATION",
    when(col("STATE_ABBREVIATION").isNull(), "Unknown")
    .otherwise(col("STATE_ABBREVIATION"))
)
[Stage 24:>                                                         (0 + 1) / 1][Stage 26:>                                                         (0 + 2) / 2][Stage 26:=============================>                            (1 + 1) / 2][Stage 29:>                                                         (0 + 2) / 2][Stage 29:=============================>                            (1 + 1) / 2]                                                                                
+-----------------------------+--------------------------+----------------------------+----------------------------+-----------------+-------------------+--------------+-----------------+----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------------------+---------------------------+--------------------------+----------------------------+-----------------+----------------------------+---------------------------+---------------------------------------+---------------------------------+------------------------------------------+------------------------------+------------------------------------+-------------------------------+------------------+------------------+------------------+------------------+--------------------+--------------------------+-------------------+-----------------------+
|EDUCATION_LEVELS_NAME_nunique|MIN_EDULEVELS_NAME_nunique|EMPLOYMENT_TYPE_NAME_nunique|MIN_YEARS_EXPERIENCE_nunique|SALARY_TO_nunique|SALARY_FROM_nunique|SALARY_nunique|CITY_NAME_nunique|MSA_NAME_nunique|STATE_NAME_nunique|NAICS2_NAME_nunique|NAICS3_NAME_nunique|NAICS4_NAME_nunique|NAICS5_NAME_nunique|NAICS6_NAME_nunique|SKILLS_NAME_nunique|SPECIALIZED_SKILLS_NAME_nunique|CERTIFICATIONS_NAME_nunique|COMMON_SKILLS_NAME_nunique|SOFTWARE_SKILLS_NAME_nunique|ONET_NAME_nunique|LOT_CAREER_AREA_NAME_nunique|LOT_OCCUPATION_NAME_nunique|LOT_SPECIALIZED_OCCUPATION_NAME_nunique|LOT_OCCUPATION_GROUP_NAME_nunique|LOT_V6_SPECIALIZED_OCCUPATION_NAME_nunique|LOT_V6_OCCUPATION_NAME_nunique|LOT_V6_OCCUPATION_GROUP_NAME_nunique|LOT_V6_CAREER_AREA_NAME_nunique|SOC_2_NAME_nunique|SOC_3_NAME_nunique|SOC_4_NAME_nunique|SOC_5_NAME_nunique|REMOTE_GROUP_nunique|STATE_ABBREVIATION_nunique|AFFILIATION_nunique|MIDPOINT_SALARY_nunique|
+-----------------------------+--------------------------+----------------------------+----------------------------+-----------------+-------------------+--------------+-----------------+----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------------------+---------------------------+--------------------------+----------------------------+-----------------+----------------------------+---------------------------+---------------------------------------+---------------------------------+------------------------------------------+------------------------------+------------------------------------+-------------------------------+------------------+------------------+------------------+------------------+--------------------+--------------------------+-------------------+-----------------------+
|29                           |6                         |3                           |16                          |4429             |4143               |5986          |3840             |653             |51                |21                 |97                 |294                |600                |814                |43269              |40734                          |1571                       |29170                     |22180                       |1                |4                           |6                          |11                                     |6                                |11                                        |6                             |6                                   |4                              |1                 |1                 |1                 |1                 |3                   |51                        |2                  |5346                   |
+-----------------------------+--------------------------+----------------------------+----------------------------+-----------------+-------------------+--------------+-----------------+----------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------------------+---------------------------+--------------------------+----------------------------+-----------------+----------------------------+---------------------------+---------------------------------------+---------------------------------+------------------------------------------+------------------------------+------------------------------------+-------------------------------+------------------+------------------+------------------+------------------+--------------------+--------------------------+-------------------+-----------------------+
pdf = selected_df.toPandas()

pdf.to_csv("./data/lightcast_cleaned.csv", index=False)

pdf.head(15)

print("Data Cleaning Complete. Rows retained:", len(pdf))
[Stage 35:>                                                         (0 + 1) / 1][Stage 37:>                                                         (0 + 2) / 2]                                                                                
Data Cleaning Complete. Rows retained: 69198